DevIOの新着記事がメディアポリシーに準拠しているかVertex AIで自動レビューしてみた
リテールアプリ共創部@大阪の岩田です。
先日以下のブログを書きました。
せっかくなのでコレのGoogle Cloud版もやってみようと思います。
構成
今回作成するシステムの概要です。ざっくりこんな構成を作ります。
- Cloud Schedulerを利用してCloud Run Functionsを起動
- DevIOの新着記事のURLを取得してPub/Subに送信する
- Pub/SubからCloud Run Functionsをトリガー
- 新着記事のURLから記事の本文を取得し、Vertex AIにレビューしてもらう
- レビュー結果をSlackに通知する
環境
今回利用した環境は以下の通りです
- Python: 3.12
- beautifulsoup4: 4.12.3
- feedparser:6.0.11
- google-cloud-pubsub: 2.23.1
- google-cloud-secret-manager: 2.20.2
- markdownify: 0.13.1
- requests:2.32.3
- slack-sdk: 3.32.0
- vertexai: 1.66.0
実装
ここからは実装を紹介していきます
新着記事の一覧を取得するCloud Run Functions
前回の記事同様にこちらの記事で紹介されていたコードを流用させてもらっています
from concurrent.futures import wait
from datetime import datetime, timedelta, timezone
import feedparser
import functions_framework
from google.cloud import pubsub_v1
import os
JST = timezone(timedelta(hours=+9))
feed_url = 'https://dev.classmethod.jp/feed/'
def get_feed_entries():
updated_since = datetime.now(JST) - timedelta(hours=1)
feed = feedparser.parse(feed_url)
new_entries = [
entry for entry in feed.entries
if datetime(*entry.updated_parsed[:6], tzinfo=timezone.utc)
.astimezone(JST) > updated_since
]
return new_entries
@functions_framework.http
def main(request):
project_id = os.environ['PROJECT_ID']
topic_name = os.environ['TOPIC_NAME']
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
new_entries = get_feed_entries()
futures = []
for entry in new_entries:
futures.append(publisher.publish(topic_path, entry['link'].encode()))
res = wait(futures)
print(res)
return 'OK'
記事の内容をレビューしてもらうCloud Run Functions
続いて記事の内容をレビューしてもらうCloud Run Functionsです。
import base64
from bs4 import BeautifulSoup
from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import secretmanager
from markdownify import markdownify
import os
from slack_sdk import WebClient
import requests
import vertexai
from vertexai.generative_models import GenerativeModel, SafetySetting
system_instruction = """
あなたは企業ブログのレビュワーです
ブログ内に不適切な表現がないかチェックする必要があります。
...略
"""
def send_slack(project_id, url, review_result):
sm_client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/blog-auto-review-slack-bot-token/versions/latest"
response = sm_client.access_secret_version(name=name)
slack_token = response.payload.data.decode("UTF-8")
slack_channel = os.environ["SLACK_CHANNEL_ID"]
slack_client = WebClient(token=slack_token)
print(review_result)
slack_client.chat_postMessage(
channel=slack_channel,
blocks=[
{
"type": "section",
"text": {"type": "mrkdwn", "text": "以下のブログをレビューしました"},
},
{"type": "section", "text": {"type": "mrkdwn", "text": url}},
{"type": "divider"},
{
"type": "section",
"text": {"type": "mrkdwn", "text": review_result},
},
{"type": "divider"},
],
)
@functions_framework.cloud_event
def main(cloud_event: CloudEvent) -> None:
url = base64.b64decode(cloud_event.data["message"]["data"]).decode()
res = requests.get(url)
soup = BeautifulSoup(res.text, "html.parser")
article = soup.find("article")
md_article = markdownify(article.prettify())
project_id = os.environ["PROJECT_ID"]
vertexai.init(project=project_id, location="us-central1")
model = GenerativeModel(
"gemini-1.5-flash-001", system_instruction=[system_instruction]
)
generation_config = {
"max_output_tokens": 8192,
"temperature": 1,
"top_p": 0.95,
}
safety_settings = [
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
]
review_content = f"""
以下ブログのレビューお願いします
```
{md_article}
```
"""
review_result = model.generate_content(
[review_content],
generation_config=generation_config,
safety_settings=safety_settings,
)
send_slack(project_id, url, review_result.text)
return review_result.text
ざっくりした処理の流れは以下の通りです。
- 記事のURLをもとに記事の内容を取得
- 取得した記事の内容をVertex AIでレビューしてもらう
- レビュー結果をSlackで通知
モデルは以下の部分でgemini-1.5-flash-001
を指定しています。
model = GenerativeModel(
"gemini-1.5-flash-001", system_instruction=[system_instruction]
)
諸々のパラメータは以下の部分で指定していますが、これはVertex AI Studioのチャットを利用した際のデフォルト値をそのまま引っ張ってきました。
generation_config = {
"max_output_tokens": 8192,
"temperature": 1,
"top_p": 0.95,
}
safety_settings = [
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_HATE_SPEECH,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
SafetySetting(
category=SafetySetting.HarmCategory.HARM_CATEGORY_HARASSMENT,
threshold=SafetySetting.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
),
]
これはマネコンのコードを取得
をクリックすると取得できるコードになっています。
リソースをデプロイするCDK TFのコード
リソースのデプロイにはCDK TFを利用しました。
まず設定値を取得するgetConfig
という処理です。今回は簡略化のためにオンコードで設定を記述する形にしています。実際の案件で利用する場合はCDK TFのコンテキストや環境変数を利用しましょう
export type Config = {
projectId: string
location: string
slackChannelId: string
}
export const getConfig = (): Config => {
return {
projectId: 'Google CloudのプロジェクトID',
location: 'asia-northeast1',
slackChannelId: 'レビュー結果を通知するSlackチャンネルのID'
}
}
続いてCloud Run Functionsをデプロイするためのカスタムコンストラクトです。このカスタムコンストラクト内でソースコードをZIPファイルに圧縮してStorageバケットにアップロードしています。
import { DataArchiveFile } from '@cdktf/provider-archive/lib/data-archive-file';
import {
Cloudfunctions2Function,
Cloudfunctions2FunctionEventTrigger,
} from '@cdktf/provider-google/lib/cloudfunctions2-function';
import { StorageBucketObject } from '@cdktf/provider-google/lib/storage-bucket-object';
import { Construct } from 'constructs';
import path = require('path');
export type CloudRunFunctionsProps = {
functionDir: string;
functionName?: string;
srcBucketName: string;
location: string;
projectId: string;
environmentVariables?: {
[key: string]: string;
}
eventTrigger?: Cloudfunctions2FunctionEventTrigger;
}
export class CloudFunctions extends Construct {
public readonly CloudRunFunction: Cloudfunctions2Function
constructor(scope: Construct, id: string, props: CloudRunFunctionsProps) {
super(scope, id);
const { eventTrigger, functionName, functionDir, srcBucketName, projectId } = props;
const code = new DataArchiveFile(this, 'archive_file', {
type: 'zip',
sourceDir: path.resolve(__dirname, '..', '..', 'functions', functionDir),
outputPath: path.resolve(
__dirname,
'..',
'..',
'cdktf.out',
'functions',
'out',
`${functionName}.zip`,
),
excludes: ['.venv', '__pycache__'],
});
const srcObj = new StorageBucketObject(this, 'source_object', {
name: code.outputMd5,
bucket: srcBucketName,
source: code.outputPath,
});
const environmentVariables: {
[key: string]: string;
} = {
PROJECT_ID: projectId,
...props.environmentVariables,
};
this.CloudRunFunction = new Cloudfunctions2Function(this, 'Default', {
name: functionName?? functionDir,
location: props.location,
buildConfig: {
entryPoint: 'main',
runtime: 'python312',
source: {
storageSource: {
bucket: srcBucketName,
object: srcObj.name,
},
},
},
serviceConfig: {
availableCpu: '1',
availableMemory: '1024M',
environmentVariables
},
eventTrigger
});
}
}
メインになるCDK TFのスタックを生成するコードは以下の通りです。スタックのコンストラクタ内でSecret Managerのシークレットも作成しています。シークレットにはignoreChanges
を指定しており、シークレット値はTerraform管理外としています。デプロイ完了後に手動で新しいシークレットバージョンを設定するのを忘れないようにしてください。
import { Construct } from "constructs";
import { TerraformStack } from "cdktf";
import * as google from "@cdktf/provider-google";
import { RandomProvider } from "@cdktf/provider-random/lib/provider";
import { Config } from "../config";
import { StorageBucket } from "@cdktf/provider-google/lib/storage-bucket";
import { SecretManagerSecret } from "@cdktf/provider-google/lib/secret-manager-secret";
import { SecretManagerSecretVersion } from "@cdktf/provider-google/lib/secret-manager-secret-version";
import { ArchiveProvider } from "@cdktf/provider-archive/lib/provider";
import { CloudFunctions } from "../constructs/cloud-run-functions";
import { PubsubTopic } from "@cdktf/provider-google/lib/pubsub-topic";
import { CloudSchedulerJob } from "@cdktf/provider-google/lib/cloud-scheduler-job";
import { ServiceAccount } from "@cdktf/provider-google/lib/service-account";
import { ProjectIamMember } from "@cdktf/provider-google/lib/project-iam-member";
type LineAudienceStackProps = Config;
export class BlogAutoReviewStack extends TerraformStack {
constructor(scope: Construct, id: string, props: LineAudienceStackProps) {
super(scope, id);
const { projectId, slackChannelId, location } = props;
new google.provider.GoogleProvider(this, "google_provider", {
project: projectId,
});
new RandomProvider(this, "random_provider");
new ArchiveProvider(this, "archive_provider");
const srcBucket = new StorageBucket(this, "src_bucket", {
name: "blog-autoreview-src-bucket",
location,
uniformBucketLevelAccess: true,
});
const pubSubTopic = new PubsubTopic(this, "new_blog", {
name: "blog-auto-review-new-blog",
messageRetentionDuration: "600s",
});
const checkFeedFuncName = "blog-auto-review-check-feed";
new CloudFunctions(this, "blog_feed", {
functionDir: "check-feed",
functionName: checkFeedFuncName,
srcBucketName: srcBucket.name,
location,
projectId,
environmentVariables: {
TOPIC_NAME: pubSubTopic.name,
},
});
new CloudFunctions(this, "auto_review", {
functionDir: "auto-review",
functionName: "blog-auto-review",
srcBucketName: srcBucket.name,
location,
projectId: projectId,
environmentVariables: {
TOPIC_NAME: pubSubTopic.name,
SLACK_CHANNEL_ID: slackChannelId,
},
eventTrigger: {
eventType: "google.cloud.pubsub.topic.v1.messagePublished",
pubsubTopic: pubSubTopic.id,
triggerRegion: props.location,
retryPolicy: "RETRY_POLICY_RETRY",
},
});
const slackBotTokenSecret = new SecretManagerSecret(
this,
"slack_bot_token",
{
secretId: "blog-auto-review-slack-bot-token",
replication: {
auto: {},
},
},
);
new SecretManagerSecretVersion(this, "slack_bot_token_version", {
lifecycle: {
ignoreChanges: "all",
},
secret: slackBotTokenSecret.id,
secretData:
"手動で新しいバージョンを作成してSlackBot用のトークンを設定して下さい",
});
const feedCheckInvokerSa = new ServiceAccount(this, "feed_check_invoker", {
accountId: "feed-check-invoker",
displayName: "Feed Check Invoker Service Account",
});
new ProjectIamMember(this, "run_invoker", {
project: projectId,
role: "roles/run.invoker",
member: feedCheckInvokerSa.member,
});
new CloudSchedulerJob(this, "review_trigger", {
project: projectId,
schedule: "0 * * * *",
timeZone: "Asia/Tokyo",
name: "blog-auto-review-scheduler",
region: location,
httpTarget: {
uri: `https://${location}-${projectId}.cloudfunctions.net/${checkFeedFuncName}`,
httpMethod: "POST",
oidcToken: {
serviceAccountEmail: feedCheckInvokerSa.email,
},
},
});
}
}
実行結果
CDK TFのコードがデプロイできたらレビュー結果がSlackに通知されるのまでしばらく待ちましょう。
うまく通知されてきました。
この記述は、Amplify Gen 2でのJavaScriptリゾルバー導入が「VTLの代替」というニュアンスになっています
という指摘を受けていますが、公式ドキュメントでも以下のように記載されており、VTLリゾルバーよりもJavaScriptリゾルバーの利用が推奨されているため、このままの記述で特に問題は無さそうです。
We now primarily support the APPSYNC_JS runtime and its documentation. Please consider using the APPSYNC_JS runtime and its guides here
その他の指摘についても同様で、生成AIは常に100%正しい回答を返してくれるわけではありません。あくまでレビューの取っ掛かりとして利用し、最終的には人間が判断するのが重要です。
まとめ
Google Cloudを使ってDevIOの記事レビューを自動化してみました。
今回紹介したコード一式は以下のGitHubリポジトリで公開しているので、よければ参考にしてみてください。